-
Notifications
You must be signed in to change notification settings - Fork 25.6k
WIP Threadpool merge scheduler sort all merges #120733
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP Threadpool merge scheduler sort all merges #120733
Conversation
f4d9595 to
5554bc2
Compare
| private void update(Runnable updater) { | ||
| threadPoolMergeExecutor.updateMergeScheduler(this, (ignored) -> { | ||
| synchronized (this) { | ||
| updater.run(); | ||
| } | ||
| }); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any update to any merge scheduler first takes the mutex of the single ThreadPoolMergeExecutor instance, and then of the merge scheduler itself.
| public synchronized void updateMergeScheduler( | ||
| ThreadPoolMergeScheduler threadPoolMergeScheduler, | ||
| Consumer<ThreadPoolMergeScheduler> updater | ||
| ) { | ||
| boolean removed = registeredMergeSchedulers.remove(threadPoolMergeScheduler); | ||
| if (false == removed) { | ||
| throw new IllegalStateException("Cannot update a merge scheduler that is not registered"); | ||
| } | ||
| currentlyExecutingMergesCount -= threadPoolMergeScheduler.getCurrentlyRunningMergeTasks().size(); | ||
| currentlyActiveIOThrottledMergesCount -= getIOThrottledMergeTasksCount(threadPoolMergeScheduler); | ||
| updater.accept(threadPoolMergeScheduler); | ||
| boolean added = registeredMergeSchedulers.add(threadPoolMergeScheduler); | ||
| if (false == added) { | ||
| throw new IllegalStateException("Found duplicate registered merge scheduler"); | ||
| } | ||
| currentlyExecutingMergesCount += threadPoolMergeScheduler.getCurrentlyRunningMergeTasks().size(); | ||
| currentlyActiveIOThrottledMergesCount += getIOThrottledMergeTasksCount(threadPoolMergeScheduler); | ||
| double newTargetMBPerSec = maybeUpdateTargetMBPerSec(); | ||
| if (newTargetMBPerSec != targetMBPerSec) { | ||
| targetMBPerSec = newTargetMBPerSec; | ||
| threadPoolMergeScheduler.setIORateLimitForAllMergeTasks(newTargetMBPerSec); | ||
| } | ||
| maybeExecuteNextMerges(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An to a threadpool has the potential to reorder it in the sorted set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It also has the potential to change the stats related to IO throttling, because that is done "globally" i.e. across all shards on the node.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did a quick read, left some comments/notes.
| ); | ||
| result.put( | ||
| ThreadPool.Names.MERGE, | ||
| new ScalingExecutorBuilder(ThreadPool.Names.MERGE, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5), false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that we throttle anyway, I'd be inclined to allow processors number of threads here.
| MergeTask mergeTask2 = tpms2.peekMergeTaskToExecute(); | ||
| if (mergeTask1 == null && mergeTask2 == null) { | ||
| // arbitrary order between schedulers that cannot run any merge right now | ||
| return System.identityHashCode(mergeTask1) - System.identityHashCode(mergeTask2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this always returns 0 and wonder if you wanted to compare the tpmses instead?
| } | ||
| } | ||
|
|
||
| public synchronized void updateMergeScheduler( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have some perhaps unqualified anxiety around this central mutex across the node. Probably not a real issue but noting it anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this is an issue, but I don't see a better way in the current design.
| double newTargetMBPerSec = maybeUpdateTargetMBPerSec(); | ||
| if (newTargetMBPerSec != targetMBPerSec) { | ||
| targetMBPerSec = newTargetMBPerSec; | ||
| threadPoolMergeScheduler.setIORateLimitForAllMergeTasks(newTargetMBPerSec); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this not set it for all merge schedulers or running tasks?
I wonder if we should keep a list of the running tasks in this class so we can easily set it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right it should be set for all merge schedulers, and, as you say, it would better be set for the running tasks only (and set it when they start).
|
We've ultimately merged a different alternative: #120869 |
Compared to #120293 , this scheduler implementation:
This design reorders merges every time a new one comes in, or an existing one (from any scheduler) finishes, synchronizing on the
ThreadPoolMergeExecutor. This complication is mainly a consequence of supporting the "max merge thread" setting, per scheduler (per shard).